package org.example;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;

public class ReadFileCreateTableStream {
    public static void main(String[] args) throws Exception {
        //1、流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2、连接外部文件系统，格式，注册字段，临时表
        tEnv.connect(new FileSystem().path("D:\\test\\a.txt"))
                .withFormat(new OldCsv())
                .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT()))
                .inAppendMode()
                .createTemporaryTable("Orders");

        //3、读取表
        Table orders = tEnv.from("Orders");

        //4、读取表字段
        Table counts = orders.select($("name"),$("age"));

        //5、转化成DataStream打印在控制台
        DataStream<Row> rowDataStream = tEnv.toAppendStream(counts, Row.class);
        rowDataStream.print();

        env.execute("readFileCreateTableStream");
    }
}
